feat: add EvaluationClient with run() for on-demand session evaluation#300
feat: add EvaluationClient with run() for on-demand session evaluation#300aidandaly24 merged 3 commits intomainfrom
Conversation
Manual Integration Test ScriptSave as This test invokes 20 turns to trigger batching (>10 trace IDs), waits 180s for CW ingestion, then runs """Temporary real test for EvaluationClient.run() batching — delete after testing."""
import json
import logging
import time
import uuid
import boto3
from bedrock_agentcore.evaluation import EvaluationClient
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("botocore").setLevel(logging.WARNING)
logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
AGENT_ARN = "arn:aws:bedrock-agentcore:us-west-2:363376058968:runtime/HealthcareAgent_HealthCareAgent-Pv2decFQqQ"
AGENT_ID = "HealthcareAgent_HealthCareAgent-Pv2decFQqQ"
REGION = "us-west-2"
def invoke_agent(session_id: str, prompt: str) -> str:
dp_client = boto3.client("bedrock-agentcore", region_name=REGION)
payload = json.dumps({"prompt": prompt}).encode()
response = dp_client.invoke_agent_runtime(
agentRuntimeArn=AGENT_ARN, runtimeSessionId=session_id, payload=payload,
)
raw_output = response["response"].read().decode("utf-8")
text_parts = []
for line in raw_output.splitlines():
if line.startswith("data: "):
chunk = line[len("data: "):]
if chunk.startswith('"') and chunk.endswith('"'):
chunk = json.loads(chunk)
text_parts.append(chunk)
return "".join(text_parts) if text_parts else raw_output
TURNS = [
"What are the symptoms of the flu?",
"How is the flu treated?",
"When should I see a doctor for the flu?",
"What causes high blood pressure?",
"What are the symptoms of diabetes?",
"How is type 2 diabetes diagnosed?",
"What are common treatments for asthma?",
"What causes migraines?",
"How can I prevent heart disease?",
"What are the side effects of ibuprofen?",
"What is the difference between a cold and the flu?",
"How does pneumonia spread?",
"What vaccines do adults need?",
"What are the early signs of arthritis?",
"How is strep throat diagnosed?",
"What causes kidney stones?",
"How can I lower my cholesterol naturally?",
"What are the symptoms of anemia?",
"How is a urinary tract infection treated?",
"What are the warning signs of a stroke?",
]
def main():
session_id = f"test-batch-{uuid.uuid4()}"
print(f"Session ID: {session_id}")
print(f"Turns: {len(TURNS)}")
for i, prompt in enumerate(TURNS):
print(f"\n Turn {i+1}/20: {prompt}")
response = invoke_agent(session_id, prompt)
print(f" Response: {response[:150]}...")
print(f"\n--- Waiting 180s for spans to land in CloudWatch ---")
time.sleep(180)
print(f"\n{'='*60}")
print(f"Running EvaluationClient.run()")
print(f"{'='*60}")
client = EvaluationClient(region_name=REGION)
results = client.run(
evaluator_ids=["Builtin.Helpfulness"],
session_id=session_id,
agent_id=AGENT_ID,
)
print(f"\n--- Results ({len(results)} total) ---")
for r in results:
print(json.dumps(r, indent=4, default=str))
if __name__ == "__main__":
main()Expected output
|
e6b25d2 to
5615fb0
Compare
5615fb0 to
181b396
Compare
src/bedrock_agentcore/evaluation/_agent_span_collector/agent_span_collector.py
Show resolved
Hide resolved
aidandaly24
left a comment
There was a problem hiding this comment.
Looks good to me very clean PR. Two small nit comments, but approved.
src/bedrock_agentcore/evaluation/_agent_span_collector/agent_span_collector.py
Show resolved
Hide resolved
181b396 to
1cf5d15
Compare
1cf5d15 to
ea3a1d0
Compare
ea3a1d0 to
5f2c473
Compare
EvaluationClient collects spans from CloudWatch and calls the evaluate API with level-aware batching (SESSION/TRACE/TOOL_CALL). Accepts evaluator_ids, session_id, and agent_id or log_group_name. Auto-derives log group from agent_id, caches evaluator level lookups, and batches evaluate requests at max 10 target IDs per request.
5f2c473 to
d917f8b
Compare
| for evaluator_id in evaluator_ids: | ||
| level = self._get_evaluator_level(evaluator_id) | ||
| logger.info("Evaluating with %s (level=%s)", evaluator_id, level) | ||
| requests = self._build_requests_for_level(evaluator_id, level, base_input, spans) |
There was a problem hiding this comment.
_build_requests_for_level raises ValueError when spans have no trace/tool IDs, but that exception isn't caught here — only the evaluate() call below is wrapped in try/except. So a TRACE evaluator with no trace IDs crashes the entire run(), while an API error just logs a warning and continues to the next evaluator. Could we wrap this call in the same try/except, or have _build_requests_for_level return [] + log a warning instead of raising?
There was a problem hiding this comment.
You're right, I'm just remove the try and catch from the for loop so if anything fails, the function errors out instead of swallowing the error.
Remove try/except around evaluate() so errors propagate to the caller instead of being silently swallowed. Simplify _extract_trace_ids with dict.fromkeys(), inline _batch() into list comprehensions, and remove the evaluator_result_count tracking variable.
aidandaly24
left a comment
There was a problem hiding this comment.
thanks for making the changes looks good to me
Summary
EvaluationClientwithrun()method that collects spans from CloudWatch and calls the evaluate API with level-aware batching (SESSION/TRACE/TOOL_CALL)_agent_span_collectorpackage withCloudWatchAgentSpanCollectorfor span collection with retry/pollingquery_stringandend_timeparameters toCloudWatchSpanHelperto support collector delegationDetails
run()accepts evaluator_ids, session_id, and agent_id or log_group_name/aws/bedrock-agentcore/runtimes/{agent_id}-DEFAULTattributes.session.id+ispresent(scope.name)Test plan
python -m pytest tests/bedrock_agentcore/evaluation/test_client.py -v(35 tests)python -m pytest tests/bedrock_agentcore/evaluation/ -v(111 tests)